package com.atguigu.flink.sql.join;

import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 *
 一个流中的数据，可以指定一个时间范围，可以和另一个流在时间范围到达的数据进行关联。
 */
public class Demo2_IntervalJoin
{
    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);

        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> ds1 = env
            .socketTextStream("hadoop102", 8888)
            .map(new WaterSensorMapFunction());

        SingleOutputStreamOperator<WaterSensor> ds2 = env
            .socketTextStream("hadoop102", 8889)
            .map(new WaterSensorMapFunction());

        Schema schema = Schema.newBuilder()
                              .column("id", "STRING")
                              .column("ts", "BIGINT")
                              .column("vc", "INT")
                              .columnByExpression("pt", "proctime()")
                              .columnByExpression("et", "TO_TIMESTAMP_LTZ(ts,3)")
                              .watermark("et","et - INTERVAL '0.001' SECOND")
                              .build();

        tableEnvironment.createTemporaryView("t1",ds1,schema);
        tableEnvironment.createTemporaryView("t2",ds2,schema);

        //interval join   t1 流中的数据，可以和 t2流中，在前后2s到达的，id相同的数据，就可以关联
       String sql =  " select t1.id,t1.ts,t2.id,t2.ts " +
                     " from t1,t2 " +
                      " where t1.id = t2.id and " +
                     " t2.et between t1.et - INTERVAL '2' second and  t1.et + INTERVAL '2' second ";

        tableEnvironment.sqlQuery(sql).execute().print();




    }
}
